/**
 * FileName: DefaultfilterImpl
 * Author:   SAMSUNG-PC 孙中军
 * Date:     2019/2/14 12:40
 * Description: 不对数据内容处理，只是单纯的实现将单列数据转换为多列
 */
package cn.com.bonc.process.impl;

import cn.com.bonc.process.Process;
import cn.com.bonc.util.ColumnsUtil;
import cn.com.bonc.util.ProcessCfgUtil;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.split;

public class Col2MutiColsProcessImpl implements Process {
    @Override
    public Dataset<Row> processing(Dataset<Row> rowDataset) {
        boolean windowAggregate = ProcessCfgUtil.isWindowAggregate();
        if(windowAggregate&&rowDataset.columns().length>1){
            return rowDataset
                    .select(col("value").cast("string"), col("timestamp"))
                    .withColumn("tmp", split(col("value"), ColumnsUtil.getRegex()))
                    .select(ColumnsUtil.getInstance().combineColumns(ColumnsUtil.getInstance().getColumns("tmp"),
                            col("timestamp")))
                    .drop(col("tmp"));
        }
        return ColumnsUtil.getInstance().getMultiColumnDataset(rowDataset,"value");
    }
}
